package k2platform.common.util;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Iterator;

public class Configuration extends PropertiesConfiguration {

    public static final Logger LOG = LoggerFactory.getLogger(Configuration.class);

    private Watcher watcher; //for zookeeper watcher

    public Configuration() {
        super();
        setDelimiterParsingDisabled(true);
        watcher = new Watcher();
    }

    public Configuration(String resource) throws ConfigurationException {
        super();
        setDelimiterParsingDisabled(true);
        this.load(resource);
        watcher = new Watcher();
    }

    public Configuration(File resource) throws ConfigurationException {
        super();
        setDelimiterParsingDisabled(true);
        this.load(resource);
        watcher = new Watcher();
    }

    public Configuration(URL resource) throws ConfigurationException {
        super();
        setDelimiterParsingDisabled(true);
        this.load(resource);
        watcher = new Watcher();
    }

    public byte[] toBytes() throws ConfigurationException, IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        this.save(out);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    }

    public void fromBytes(byte[] bytes) throws ConfigurationException, IOException {
        if (bytes == null) {
            LOG.warn("Bytes Array for init configuration object is null. Configuration Object init fail!");
            return;
        }
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        this.load(in);
        in.close();
    }

    public void loadFromZookeeper(ZookeeperService zooService) throws Exception {
        this.fromBytes(zooService.get(Constants.ZOOKEEPER_CONF_PATH));
    }

    public void loadFromZookeeper(String zookeeperUrl, String namespace) throws Exception {
        ZookeeperService zookeeperService = new ZookeeperService(zookeeperUrl, namespace);
        zookeeperService.start();
        loadFromZookeeper(zookeeperService);
        zookeeperService.close();
    }

    public void loadFromZookeeper(String zookeeperUrl) throws Exception {
        loadFromZookeeper(zookeeperUrl, Constants.ZOOKEEPER_NAMESPACE);
    }

    /**
     * If property does not exist, will create. If exists, will update
     *
     * @param conf
     */
    public void addConfiguration(Configuration conf) {
        Iterator<String> keys = conf.getKeys();
        while (keys.hasNext()) {
            String key = keys.next();
            this.setProperty(key, conf.getProperty(key));
        }
    }

    public Watcher getWatcher() {
        return this.watcher;
    }

    class Watcher extends NodeWatcher {

        public Watcher() {
            super();
        }

        @Override
        public void clear() {
        }

        @Override
        public void load(byte[] data) {
            try {
                Configuration tempConf = new Configuration();
                tempConf.fromBytes(data);
                addConfiguration(tempConf);
                tempConf.clear();

                LOG.info("Configurations after update:{}.", Configuration.this.toString());
            } catch (ConfigurationException | IOException e) {
                LOG.error("Error to load configuration object from byte array");
                e.printStackTrace();
            }
        }
    }

    @Override
    public String toString() {
        Iterator<String> it = this.getKeys();
        StringBuffer sb = new StringBuffer();
        sb.append("{");
        while (it.hasNext()) {
            String key = it.next();
            sb.append(key).append("=").append(this.getProperty(key)).append(", ");
        }
        // delete the last comma and space.
        if (sb.length() > 3) {
            sb.deleteCharAt(sb.length() - 2);
            sb.deleteCharAt(sb.length() - 1);
        }
        sb.append("}");

        return sb.toString();
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("Usage: Configuration ZookeeperConfFilePath DataPlatformConfFilePath");
            System.exit(1);
        }
        Configuration zookeeperConf = new Configuration(args[0]);
        ZookeeperService zs = new ZookeeperService(zookeeperConf);
        zs.start();
        Configuration dataplatformConf = new Configuration(args[1]);
        zs.create(Constants.ZOOKEEPER_CONF_PATH, dataplatformConf.toBytes());
        System.out.println("DataPlatform Configurations in " + args[1] + " have been stored in Zookeeper");

        if (!zs.checkExists(Constants.ZOOKEEPER_ASSET_PATH)) {
            zs.create(Constants.ZOOKEEPER_ASSET_PATH, String.valueOf(System.currentTimeMillis()).getBytes());
            System.out.println("Created asset zk node");
        }
        if (!zs.checkExists(Constants.ZOOKEEPER_SCHEMA_PATH)) {
            zs.create(Constants.ZOOKEEPER_SCHEMA_PATH, String.valueOf(System.currentTimeMillis()).getBytes());
            System.out.println("Created schema zk node");
        }
        if (!zs.checkExists(Constants.ZOOKEEPER_SCHEMA_RELOAD_PATH)) {
            zs.create(Constants.ZOOKEEPER_SCHEMA_RELOAD_PATH, String.valueOf(System.currentTimeMillis()).getBytes());
            System.out.println("Created schema zk reload node");
        }

        zs.close();
    }
}